package com.bawei.persona.realtime.app.func;

import com.alibaba.fastjson.JSONObject;

import com.bawei.persona.realtime.common.GmallConfig;
import com.bawei.persona.realtime.util.ThreadPoolUtil;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;


/**
 * 上海大数据学院
 * 项目规划及管理：李剑
 * 技术指导及需求分析：郭洵
 * 编程：楚志高
 *
 * @author bawei  bigdata sh
 * @since 2021-06-11
 */
public class DimAsyncFunctionNew extends RichAsyncFunction< Tuple2<String,String>,String > {

    // 数据库连接

    private transient Connection conn = null;
    private transient Table table = null;


    //线程池对象的父接口生命（多态）
    private ExecutorService executorService;

    //维度的表名
    private String tableName;

    private String  mainKey;

   //传入的参数为表明与主键 输出的结果为hbase 里面维度表的数据
    public DimAsyncFunctionNew(Tuple2<String,String> param) {

        this.tableName = param.f0 ;
        this.mainKey = param.f1 ;
    }

    //打开 hbase 的数据库
    @Override
    public void open(Configuration parameters) throws Exception {
        //初始化线程池对象
        System.out.println("初始化线程池对象");
        executorService = ThreadPoolUtil.getInstance();
        //对数据库进行连接
        super.open(parameters);

        org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create();
        //链接服务器
        conf.set("hbase.zookeeper.quorum", GmallConfig.HBASE_ZOOKEEPER_QUORUM);
        conf.set("hbase.zookeeper.property.clientPort", GmallConfig.HBASE_ZOOKEEPER_PROPERTY_CLIENTPORT);
        if (null == conn) {
            this.conn = ConnectionFactory.createConnection(conf);
        }

    }





    @Override
    public void asyncInvoke(Tuple2<String, String> params, ResultFuture<String> resultFuture) throws Exception {
        executorService.submit(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //发送异步请求
                            long start = System.currentTimeMillis();
                            //从流中事实数据获取key
                            String key =params.f1 ; //主键
                            String table1 = params.f0; //表名
                            //根据维度的主键到维度表中进行查询
                            // scan 范围数据
                            Get get = new Get(Bytes.toBytes("row"));
                            Result res = table.get(get);
                            //设置将来转化为的主键 ，将map 值转化为json 对象
                            Map resultMap = new HashMap() ;
                            resultMap.put("id", key) ;
                            for (Cell cell:res.rawCells()){
                                //根据表明把所有的数据进行包装从json 类型
                                resultMap.put("tablename", table1) ;
                                //循环将hbase表中的数据给压缩进去
                                byte[] bytes = CellUtil.cloneQualifier(cell);
                                byte[] value = CellUtil.cloneValue(cell);
                                String pkname  = Bytes.toString(bytes);
                                String pkvalue  = Bytes.toString(value);
                                resultMap.put(pkname, pkvalue) ;
                                //将map 结构转化为json 对象

                            }
                            String jsonString = JSONObject.toJSONString(resultMap);

//                            JSONObject dimInfoJsonObj = DimUtil.getDimInfo(tableName, key);
//
//
//                            //System.out.println("维度数据Json格式：" + dimInfoJsonObj);
//
//                            if(dimInfoJsonObj != null){
//                                //维度关联  流中的事实数据和查询出来的维度数据进行关联
//                                join(obj,dimInfoJsonObj);
//                            }
                            //System.out.println("维度关联后的对象:" + obj);
                            long end = System.currentTimeMillis();
                            System.out.println("异步维度查询耗时" +(end -start)+"毫秒");
                            //将关联后的数据数据继续向下传递
//                            resultFuture.complete(Arrays.asList(obj));
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw new RuntimeException(tableName + "维度异步查询失败");
                        }
                    }
                }
        );


    }

    @Override
    public void close() throws Exception {

        super.close();

        if (table != null){
            table.close();
        }

        if (conn != null){
            conn.close();
        }


    }

    @Override
    public void timeout(Tuple2<String, String> input, ResultFuture<String > resultFuture) throws Exception {

    }
}
